home *** CD-ROM | disk | FTP | other *** search
/ Delphi Magazine Collection 2001 / Delphi Magazine Collection 20001 (2001).iso / DISKS / Issue56 / Alfresco / AAThdNCp.pas < prev    next >
Encoding:
Pascal/Delphi Source File  |  2000-03-01  |  9.2 KB  |  306 lines

  1. {*********************************************************}
  2. {* AAThdNCp                                              *}
  3. {* Copyright (c) Julian M Bucknall 1998-2000             *}
  4. {* All rights reserved.                                  *}
  5. {*********************************************************}
  6. {* Algorithms Alfresco: multithreaded multibuffered copy *}
  7. {*********************************************************}
  8.  
  9. {Note: this unit is released as freeware. In other words, you are free
  10.        to use this unit in your own applications, however I retain all
  11.        copyright to the code. JMB}
  12.  
  13. unit AAThdNCp;
  14.  
  15. interface
  16.  
  17. uses
  18.   SysUtils, Windows, Classes;
  19.  
  20. const
  21.   aac_MaxConsumers = 32;
  22.  
  23. type
  24.   PaaStreamArray = ^TaaStreamArray;
  25.   TaaStreamArray = array [0..pred(aac_MaxConsumers)] of TStream;
  26.  
  27. procedure AAThreadedMultiCopyStream(aSrcStream   : TStream;
  28.                                     aDestCount   : integer;           
  29.                                     aDestStreams : PaaStreamArray);
  30.  
  31. implementation
  32.  
  33. const
  34.   BufferSize = 1024;
  35.  
  36. type
  37.   PBuffer = ^TBuffer;
  38.   TBuffer = packed record
  39.     bToReadCount : integer;
  40.     bCount       : longint;
  41.     bBlock       : array [0..pred(BufferSize)] of byte;
  42.   end;
  43.  
  44.   PBufferArray = ^TBufferArray;
  45.   TBufferArray = array [0..1023] of PBuffer;
  46.  
  47.   TQueuedBuffers = class
  48.     private
  49.       FBufCount      : integer;
  50.       FBuffers       : PBufferArray;
  51.       FConsumerCount : integer;
  52.       FHead          : array [0..pred(aac_MaxConsumers)] of integer;
  53.       FIsNotEmpty    : array [0..pred(aac_MaxConsumers)] of THandle;
  54.                                                           {semaphores}
  55.       FIsNotFull     : THandle; {semaphore}
  56.       FTail          : integer;
  57.     protected
  58.       function qbGetHead(aInx : integer) : PBuffer;
  59.       function qbGetIsNotEmpty(aInx : integer) : THandle;
  60.       function qbGetTail : PBuffer;
  61.     public
  62.       constructor Create(aBufferCount   : integer;
  63.                          aConsumerCount : integer);
  64.       destructor Destroy; override;
  65.  
  66.       procedure AdvanceHead(aConsumerId : integer);
  67.       procedure AdvanceTail;
  68.  
  69.       property Head[aInx : integer] : PBuffer read qbGetHead;
  70.       property Tail : PBuffer read qbGetTail;
  71.  
  72.       property IsNotEmpty[aInx : integer] : THandle
  73.                   read qbGetIsNotEmpty;
  74.       property IsNotFull : THandle read FIsNotFull;
  75.  
  76.       property ConsumerCount : integer read FConsumerCount;
  77.   end;
  78.  
  79. type
  80.   TProducer = class(TThread)
  81.     private
  82.       FStream  : TStream;
  83.       FBuffers : TQueuedBuffers;
  84.     protected
  85.       procedure Execute; override;
  86.     public
  87.       constructor Create(aStream  : TStream;
  88.                          aBuffers : TQueuedBuffers);
  89.       destructor Destroy; override;
  90.   end;
  91.  
  92. type
  93.   TConsumer = class(TThread)
  94.     private
  95.       FStream  : TStream;
  96.       FBuffers : TQueuedBuffers;
  97.       FID      : integer;
  98.     protected
  99.       procedure Execute; override;
  100.     public
  101.       constructor Create(aStream  : TStream;
  102.                          aBuffers : TQueuedBuffers;
  103.                          aID      : integer);
  104.       destructor Destroy; override;
  105.   end;
  106.  
  107. {===TQueuedBuffers===================================================}
  108. constructor TQueuedBuffers.Create(aBufferCount   : integer;
  109.                                   aConsumerCount : integer);
  110. var
  111.   i : integer;
  112. begin
  113.   inherited Create;
  114.   {allocate the buffers}
  115.   FBuffers := AllocMem(aBufferCount * sizeof(pointer));
  116.   for i := 0 to pred(aBufferCount) do
  117.     GetMem(FBuffers^[i], sizeof(TBuffer));
  118.   FBufCount := aBufferCount;
  119.   {create the semaphores}
  120.   FConsumerCount := aConsumerCount;
  121.   FIsNotFull := CreateSemaphore(nil, aBufferCount, aBufferCount, '');
  122.   for i := 0 to pred(aConsumerCount) do
  123.     FIsNotEmpty[i] := CreateSemaphore(nil, 0, aBufferCount, '');
  124. end;
  125. {--------}
  126. destructor TQueuedBuffers.Destroy;
  127. var
  128.   i : integer;
  129. begin
  130.   {destroy the semaphores}
  131.   if (FIsNotFull <> 0) then
  132.     CloseHandle(FIsNotFull);
  133.   for i := 0 to pred(ConsumerCount) do
  134.     if (FIsNotEmpty[i] <> 0) then
  135.       CloseHandle(FIsNotEmpty[i]);
  136.   {free the buffers}
  137.   if (FBuffers <> nil) then begin
  138.     for i := 0 to pred(FBufCount) do
  139.       if (FBuffers^[i] <> nil) then
  140.         FreeMem(FBuffers^[i], sizeof(TBuffer));
  141.     FreeMem(FBuffers, FBufCount * sizeof(pointer));
  142.   end;
  143.   inherited Destroy;
  144. end;
  145. {--------}
  146. procedure TQueuedBuffers.AdvanceHead(aConsumerId : integer);
  147. begin
  148.   inc(FHead[aConsumerId]);
  149.   if (FHead[aConsumerId] = FBufCount) then
  150.     FHead[aConsumerId] := 0;
  151. end;
  152. {--------}
  153. procedure TQueuedBuffers.AdvanceTail;
  154. begin
  155.   inc(FTail);
  156.   if (FTail = FBufCount) then
  157.     FTail := 0;
  158. end;
  159. {--------}
  160. function TQueuedBuffers.qbGetHead(aInx : integer) : PBuffer;
  161. begin
  162.   Result := FBuffers^[FHead[aInx]];
  163. end;
  164. {--------}
  165. function TQueuedBuffers.qbGetIsNotEmpty(aInx : integer) : THandle;
  166. begin
  167.   Result := FIsNotEmpty[aInx];
  168. end;
  169. {--------}
  170. function TQueuedBuffers.qbGetTail : PBuffer;
  171. begin
  172.   Result := FBuffers^[FTail];
  173. end;
  174. {====================================================================}
  175.  
  176.  
  177. {===TProducer========================================================}
  178. constructor TProducer.Create(aStream  : TStream;
  179.                              aBuffers : TQueuedBuffers);
  180. begin
  181.   inherited Create(true);
  182.   FStream := aStream;
  183.   FBuffers := aBuffers;
  184. end;
  185. {--------}
  186. destructor TProducer.Destroy;
  187. begin
  188.   inherited Destroy;
  189. end;
  190. {--------}
  191. procedure TProducer.Execute;
  192. var
  193.   Tail : PBuffer;
  194.   i    : integer;
  195. begin
  196.   {do until the stream is exhausted...}
  197.   repeat
  198.     {get the 'queue is not full' semaphore}
  199.     WaitForSingleObject(FBuffers.IsNotFull, INFINITE);
  200.     {read a block from the stream into the head buffer}
  201.     Tail := FBuffers.Tail;
  202.     Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);
  203.     Tail^.bToReadCount := FBuffers.ConsumerCount;
  204.     {advance the hail pointer}
  205.     FBuffers.AdvanceTail;
  206.     {as we've written a new buffer, signal all the 'queue is not
  207.      empty' semaphores}
  208.     for i := 0 to pred(FBuffers.ConsumerCount) do
  209.       ReleaseSemaphore(FBuffers.IsNotEmpty[i], 1, nil);
  210.   until (Tail^.bCount = 0);
  211. end;
  212. {====================================================================}
  213.  
  214.  
  215. {===TConsumer========================================================}
  216. constructor TConsumer.Create(aStream  : TStream;
  217.                              aBuffers : TQueuedBuffers;
  218.                              aID      : integer);
  219. begin
  220.   inherited Create(true);
  221.   FStream := aStream;
  222.   FBuffers := aBuffers;
  223.   FID := aID;
  224. end;
  225. {--------}
  226. destructor TConsumer.Destroy;
  227. begin
  228.   inherited Destroy;
  229. end;
  230. {--------}
  231. procedure TConsumer.Execute;
  232. var
  233.   Head      : PBuffer;
  234.   NumToRead : integer;
  235. begin
  236.   {get our 'queue is not empty' semaphore}
  237.   WaitForSingleObject(FBuffers.IsNotEmpty[FID], INFINITE);
  238.   {get the head buffer}
  239.   Head := FBuffers.Head[FID];
  240.   {while the head buffer is not empty...}
  241.   while (Head^.bCount <> 0) do begin
  242.     {write a block from the head buffer into the stream}
  243.     FStream.Write(Head^.bBlock, Head^.bCount);
  244.     {we've finished with this buffer, so safely decrement the count of
  245.      consumers who have still to read this buffer}
  246.     NumToRead := InterlockedDecrement(Head^.bToReadCount);
  247.     {advance our head pointer}
  248.     FBuffers.AdvanceHead(FID);
  249.     {if we were the last consumer to read this buffer...}
  250.     if (NumToRead = 0) then
  251.       {signal the 'queue is not full' semaphore}
  252.       ReleaseSemaphore(FBuffers.IsNotFull, 1, nil);
  253.     {get our 'queue is not empty' semaphore}
  254.     WaitForSingleObject(FBuffers.IsNotEmpty[FID], INFINITE);
  255.     {get the head buffer}
  256.     Head := FBuffers.Head[FID];
  257.   end;
  258. end;
  259. {====================================================================}
  260.  
  261.  
  262. {===Interfaced routine===============================================}
  263. procedure AAThreadedMultiCopyStream(aSrcStream   : TStream;
  264.                                     aDestCount   : integer;
  265.                                     aDestStreams : PaaStreamArray);
  266. var
  267.   i : integer;
  268.   Buffers   : TQueuedBuffers;
  269.   Producer  : TProducer;
  270.   Consumer  : array [0..pred(aac_MaxConsumers)] of TConsumer;
  271.   WaitArray : array [0..aac_MaxConsumers] of THandle;
  272. begin
  273.   Buffers := nil;
  274.   Producer := nil;
  275.   for i := 0 to pred(aac_MaxConsumers) do
  276.     Consumer[i] := nil;
  277.   for i := 0 to aac_MaxConsumers do
  278.     WaitArray[i] := 0;
  279.   try
  280.     {create the queued buffer object}
  281.     Buffers := TQueuedBuffers.Create(20, aDestCount);
  282.     {create the producer thread, save its handle}
  283.     Producer := TProducer.Create(aSrcStream, Buffers);
  284.     WaitArray[0] := Producer.Handle;
  285.     {create the consumer threads, save their handles}
  286.     for i := 0 to pred(aDestCount) do begin
  287.       Consumer[i] := TConsumer.Create(aDestStreams^[i], Buffers, i);
  288.       WaitArray[i+1] := Consumer[i].Handle;
  289.     end;
  290.     {start the threads up}
  291.     for i := 0 to pred(aDestCount) do
  292.       Consumer[i].Resume;
  293.     Producer.Resume;
  294.     {wait for the threads to finish}
  295.     WaitForMultipleObjects(1+aDestCount, @WaitArray, true, INFINITE);
  296.   finally
  297.     Producer.Free;
  298.     for i := 0 to pred(aDestCount) do
  299.       Consumer[i].Free;
  300.     Buffers.Free;
  301.   end;
  302. end;
  303. {====================================================================}
  304.  
  305. end.
  306.